/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*****************************************************************************
 * $Id: vss.cpp 1926 2013-06-30 21:18:14Z wweeks $
 *
 ****************************************************************************/

#include <iostream>
#include <sstream>
//#define NDEBUG
#include <cassert>

#include <sys/types.h>
#include <cerrno>
#include <fcntl.h>
#include <sys/stat.h>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>


#include "rwlock.h"
#include "brmtypes.h"
#include "mastersegmenttable.h"
#include "vbbm.h"
#include "extentmap.h"
#include "cacheutils.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"

#define VSS_DLLEXPORT
#include "vss.h"
#undef VSS_DLLEXPORT

#define VSS_MAGIC_V1 0x7218db12

using namespace std;
using namespace boost;
using namespace idbdatafile;

namespace BRM
{
VSSEntry::VSSEntry()
{
  lbid = -1;
  verID = 0;
  vbFlag = false;
  locked = false;
  next = -1;
}

/*static*/
boost::mutex VSSImpl::fInstanceMutex;
boost::mutex VSS::mutex;

/*static*/
VSSImpl* VSSImpl::fInstance = 0;

/*static*/
VSSImpl* VSSImpl::makeVSSImpl(unsigned key, off_t size, bool readOnly)
{
  boost::mutex::scoped_lock lk(fInstanceMutex);

  if (fInstance)
  {
    if (key != fInstance->fVSS.key())
    {
      BRMShmImpl newShm(key, size);
      fInstance->swapout(newShm);
    }

    idbassert(key == fInstance->fVSS.key());
    return fInstance;
  }

  fInstance = new VSSImpl(key, size, readOnly);

  return fInstance;
}

VSSImpl::VSSImpl(unsigned key, off_t size, bool readOnly) : fVSS(key, size, readOnly)
{
}

VSS::VSS()
{
  vss = 0;
  hashBuckets = 0;
  storage = 0;
  currentVSSShmkey = -1;
  vssShmid = 0;
  vssShminfo = NULL;
  r_only = false;
  fPVSSImpl = 0;
}

VSS::~VSS()
{
}

// ported from ExtentMap
void VSS::lock(OPS op)
{
  char* shmseg;

  if (op == READ)
  {
    vssShminfo = mst.getTable_read(MasterSegmentTable::VSSSegment);
    mutex.lock();
  }
  else
    vssShminfo = mst.getTable_write(MasterSegmentTable::VSSSegment);

  // this means that either the VSS isn't attached or that it was resized
  if (!fPVSSImpl || fPVSSImpl->key() != (unsigned)vssShminfo->tableShmkey)
  {
    if (vssShminfo->allocdSize == 0)
    {
      if (op == READ)
      {
        mutex.unlock();
        mst.getTable_upgrade(MasterSegmentTable::VSSSegment);

        try
        {
          growVSS();
        }
        catch (...)
        {
          release(WRITE);
          throw;
        }

        mst.getTable_downgrade(MasterSegmentTable::VSSSegment);
      }
      else
      {
        try
        {
          growVSS();
        }
        catch (...)
        {
          release(WRITE);
          throw;
        }
      }
    }
    else
    {
      fPVSSImpl = VSSImpl::makeVSSImpl(vssShminfo->tableShmkey, 0);
      idbassert(fPVSSImpl);

      if (r_only)
        fPVSSImpl->makeReadOnly();

      vss = fPVSSImpl->get();
      shmseg = reinterpret_cast<char*>(vss);
      hashBuckets = reinterpret_cast<int*>(&shmseg[sizeof(VSSShmsegHeader)]);
      storage =
          reinterpret_cast<VSSEntry*>(&shmseg[sizeof(VSSShmsegHeader) + vss->numHashBuckets * sizeof(int)]);

      if (op == READ)
        mutex.unlock();
    }
  }
  else
  {
    vss = fPVSSImpl->get();
    shmseg = reinterpret_cast<char*>(vss);
    hashBuckets = reinterpret_cast<int*>(&shmseg[sizeof(VSSShmsegHeader)]);
    storage =
        reinterpret_cast<VSSEntry*>(&shmseg[sizeof(VSSShmsegHeader) + vss->numHashBuckets * sizeof(int)]);

    if (op == READ)
      mutex.unlock();
  }
}

// ported from ExtentMap
void VSS::release(OPS op)
{
  if (op == READ)
    mst.releaseTable_read(MasterSegmentTable::VSSSegment);
  else
    mst.releaseTable_write(MasterSegmentTable::VSSSegment);
}

void VSS::initShmseg()
{
  int i;
  char* newshmseg;
  int* buckets;
  VSSEntry* stor;

  vss->capacity = VSSSTORAGE_INITIAL_SIZE / sizeof(VSSEntry);
  vss->currentSize = 0;
  vss->lockedEntryCount = 0;
  vss->LWM = 0;
  vss->numHashBuckets = VSSTABLE_INITIAL_SIZE / sizeof(int);
  newshmseg = reinterpret_cast<char*>(vss);

  buckets = reinterpret_cast<int*>(&newshmseg[sizeof(VSSShmsegHeader)]);
  stor = reinterpret_cast<VSSEntry*>(&newshmseg[sizeof(VSSShmsegHeader) + vss->numHashBuckets * sizeof(int)]);

  for (i = 0; i < vss->numHashBuckets; i++)
    buckets[i] = -1;

  for (i = 0; i < vss->capacity; i++)
    stor[i].lbid = -1;
}

// assumes write lock is held
void VSS::growVSS()
{
  int allocSize;
  key_t newshmkey;
  char* newshmseg;

  if (vssShminfo->allocdSize == 0)
    allocSize = VSS_INITIAL_SIZE;
  else
    allocSize = vssShminfo->allocdSize + VSS_INCREMENT;

  newshmkey = chooseShmkey();
  idbassert((allocSize == VSS_INITIAL_SIZE && !fPVSSImpl) || fPVSSImpl);

  if (fPVSSImpl)
  {
    BRMShmImpl newShm(newshmkey, allocSize);
    newshmseg = static_cast<char*>(newShm.fMapreg.get_address());
    memset(newshmseg, 0, allocSize);
    idbassert(vss);
    VSSShmsegHeader* tmp = reinterpret_cast<VSSShmsegHeader*>(newshmseg);
    tmp->capacity = vss->capacity + VSSSTORAGE_INCREMENT / sizeof(VSSEntry);
    tmp->numHashBuckets = vss->numHashBuckets + VSSTABLE_INCREMENT / sizeof(int);
    tmp->LWM = 0;
    copyVSS(tmp);
    fPVSSImpl->swapout(newShm);
  }
  else
  {
    fPVSSImpl = VSSImpl::makeVSSImpl(newshmkey, allocSize);
    newshmseg = reinterpret_cast<char*>(fPVSSImpl->get());
    memset(newshmseg, 0, allocSize);
  }

  vss = fPVSSImpl->get();

  if (allocSize == VSS_INITIAL_SIZE)
    initShmseg();

  vssShminfo->tableShmkey = newshmkey;
  vssShminfo->allocdSize = allocSize;

  if (r_only)
  {
    fPVSSImpl->makeReadOnly();
    vss = fPVSSImpl->get();
  }

  newshmseg = reinterpret_cast<char*>(vss);
  hashBuckets = reinterpret_cast<int*>(&newshmseg[sizeof(VSSShmsegHeader)]);
  storage =
      reinterpret_cast<VSSEntry*>(&newshmseg[sizeof(VSSShmsegHeader) + vss->numHashBuckets * sizeof(int)]);
}

// assumes write lock is held
void VSS::growForLoad(int elementCount)
{
  int allocSize;
  key_t newshmkey;
  char* newshmseg;
  int i;

  if (elementCount < VSSSTORAGE_INITIAL_COUNT)
    elementCount = VSSSTORAGE_INITIAL_COUNT;

  /* round up to the next normal increment out of paranoia */
  if (elementCount % VSSSTORAGE_INCREMENT_COUNT)
    elementCount = ((elementCount / VSSSTORAGE_INCREMENT_COUNT) + 1) * VSSSTORAGE_INCREMENT_COUNT;

  allocSize = VSS_SIZE(elementCount);

  newshmkey = chooseShmkey();

  if (fPVSSImpl)
  {
    // isn't this the same as makeVSSImpl()?
    BRMShmImpl newShm(newshmkey, allocSize);
    fPVSSImpl->swapout(newShm);
  }
  else
  {
    fPVSSImpl = VSSImpl::makeVSSImpl(newshmkey, allocSize);
  }

  vss = fPVSSImpl->get();
  vss->capacity = elementCount;
  vss->currentSize = 0;
  vss->LWM = 0;
  vss->numHashBuckets = elementCount / 4;
  vss->lockedEntryCount = 0;
  undoRecords.clear();
  newshmseg = reinterpret_cast<char*>(vss);
  hashBuckets = reinterpret_cast<int*>(&newshmseg[sizeof(VSSShmsegHeader)]);
  storage =
      reinterpret_cast<VSSEntry*>(&newshmseg[sizeof(VSSShmsegHeader) + vss->numHashBuckets * sizeof(int)]);

  for (i = 0; i < vss->capacity; i++)
    storage[i].lbid = -1;

  for (i = 0; i < vss->numHashBuckets; i++)
    hashBuckets[i] = -1;

  vssShminfo->tableShmkey = newshmkey;
  vssShminfo->allocdSize = allocSize;
}

// assumes write lock is held and the src is vbbm
// and that dest->{numHashBuckets, capacity, LWM} have been set.
void VSS::copyVSS(VSSShmsegHeader* dest)
{
  int i;
  int* newHashtable;
  VSSEntry* newStorage;
  char* cDest = reinterpret_cast<char*>(dest);

  // copy metadata
  dest->currentSize = vss->currentSize;
  dest->lockedEntryCount = vss->lockedEntryCount;

  newHashtable = reinterpret_cast<int*>(&cDest[sizeof(VSSShmsegHeader)]);
  newStorage =
      reinterpret_cast<VSSEntry*>(&cDest[sizeof(VSSShmsegHeader) + dest->numHashBuckets * sizeof(int)]);

  // initialize new storage & hash
  for (i = 0; i < dest->numHashBuckets; i++)
    newHashtable[i] = -1;

  for (i = 0; i < dest->capacity; i++)
    newStorage[i].lbid = -1;

  // walk the storage & re-hash all entries;
  for (i = 0; i < vss->currentSize; i++)
    if (storage[i].lbid != -1)
    {
      _insert(storage[i], dest, newHashtable, newStorage, true);
      // confirmChanges();
    }
}

key_t VSS::chooseShmkey() const
{
  int fixedKeys = 1;
  key_t ret;

  if (vssShminfo->tableShmkey + 1 == (key_t)(fShmKeys.KEYRANGE_VSS_BASE + fShmKeys.KEYRANGE_SIZE - 1) ||
      (unsigned)vssShminfo->tableShmkey < fShmKeys.KEYRANGE_VSS_BASE)
    ret = fShmKeys.KEYRANGE_VSS_BASE + fixedKeys;
  else
    ret = vssShminfo->tableShmkey + 1;

  return ret;
}

void VSS::insert(LBID_t lbid, VER_t verID, bool vbFlag, bool locked, bool loading)
{
  VSSEntry entry;

#ifdef BRM_DEBUG

  if (lbid < 0)
  {
    log("VSS::insert(): lbid must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::insert(): lbid must be >= 0");
  }

  if (verID < 0)
  {
    log("VSS::insert(): verID must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::insert(): verID must be >= 0");
  }

#endif

  entry.lbid = lbid;
  entry.verID = verID;
  entry.vbFlag = vbFlag;
  entry.locked = locked;

  // cerr << "Insert to vss lbid:verID:locked:vbFlag = " << entry.lbid <<":" <<entry.verID<<":"<< entry.locked
  // <<":"<<entry.vbFlag<<endl;
  // check for resize
  if (vss->currentSize == vss->capacity)
    growVSS();

  _insert(entry, vss, hashBuckets, storage, loading);

  if (!loading)
    makeUndoRecord(&vss->currentSize, sizeof(vss->currentSize));

  vss->currentSize++;

  if (locked)
    vss->lockedEntryCount++;
}

// assumes write lock is held and that it is properly sized already
// metadata is modified by the caller
void VSS::_insert(VSSEntry& e, VSSShmsegHeader* dest, int* destHash, VSSEntry* destStorage, bool loading)
{
  int hashIndex, insertIndex;

  hashIndex = hasher((char*)&e.lbid, sizeof(e.lbid)) % dest->numHashBuckets;

  insertIndex = dest->LWM;

  while (destStorage[insertIndex].lbid != -1)
  {
    insertIndex++;
#ifdef BRM_DEBUG

    if (insertIndex == dest->capacity)
    {
      log("VSS:_insert(): There are no empty entries. Check resize condition.", logging::LOG_TYPE_DEBUG);
      throw logic_error("VSS:_insert(): There are no empty entries. Check resize condition.");
    }

#endif
  }

  if (!loading)
    makeUndoRecord(dest, sizeof(VSSShmsegHeader));

  dest->LWM = insertIndex + 1;

  if (!loading)
  {
    makeUndoRecord(&destStorage[insertIndex], sizeof(VSSEntry));
    makeUndoRecord(&destHash[hashIndex], sizeof(int));
  }

  e.next = destHash[hashIndex];
  destStorage[insertIndex] = e;
  destHash[hashIndex] = insertIndex;
}

// assumes read lock is held
int VSS::lookup(LBID_t lbid, const QueryContext_vss& verInfo, VER_t txnID, VER_t* outVer, bool* vbFlag,
                bool vbOnly) const
{
  int hashIndex, maxVersion = -1, minVersion = -1, currentIndex;
  VSSEntry *listEntry, *maxEntry = NULL;

#ifdef BRM_DEBUG

  if (lbid < 0)
  {
    log("VSS::lookup(): lbid must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::lookup(): lbid must be >= 0");
  }

  if (verInfo.currentScn < 0)
  {
    log("VSS::lookup(): verID must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::lookup(): verID must be >= 0");
  }

  if (txnID < 0)
  {
    log("VSS::lookup(): txnID must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::lookup(): txnID must be >= 0");
  }

#endif

  hashIndex = hasher((char*)&lbid, sizeof(lbid)) % vss->numHashBuckets;

  currentIndex = hashBuckets[hashIndex];

  while (currentIndex != -1)
  {
    listEntry = &storage[currentIndex];

    if (listEntry->lbid == lbid)
    {
      if (vbOnly && !listEntry->vbFlag)
        goto next;

      /* "if it is/was part of a different transaction, ignore it"
       */
      if (txnID != listEntry->verID && (verInfo.txns->find(listEntry->verID) != verInfo.txns->end()))
        goto next;

      /* fast exit if the exact version is found */
      if (verInfo.currentScn == listEntry->verID)
      {
        *outVer = listEntry->verID;
        *vbFlag = listEntry->vbFlag;
        return 0;
      }

      /* Track the min version of this LBID */
      if (minVersion > listEntry->verID || minVersion == -1)
        minVersion = listEntry->verID;

      /* Pick the highest version <= the SCN of the query */
      if (verInfo.currentScn > listEntry->verID && maxVersion < listEntry->verID)
      {
        maxVersion = listEntry->verID;
        maxEntry = listEntry;
      }
    }

  next:
    currentIndex = listEntry->next;
  }

  if (maxEntry != NULL)
  {
    *outVer = maxVersion;
    *vbFlag = maxEntry->vbFlag;
    return 0;
  }
  else if (minVersion > verInfo.currentScn)
  {
    *outVer = 0;
    *vbFlag = false;
    return ERR_SNAPSHOT_TOO_OLD;
  }

  *outVer = 0;
  *vbFlag = false;
  return -1;
}

VER_t VSS::getCurrentVersion(LBID_t lbid, bool* isLocked) const
{
  int hashIndex, currentIndex;
  VSSEntry* listEntry;

  hashIndex = hasher((char*)&lbid, sizeof(lbid)) % vss->numHashBuckets;
  currentIndex = hashBuckets[hashIndex];

  while (currentIndex != -1)
  {
    listEntry = &storage[currentIndex];

    if (listEntry->lbid == lbid && !listEntry->vbFlag)
    {
      if (isLocked != NULL)
        *isLocked = listEntry->locked;

      return listEntry->verID;
    }

    currentIndex = listEntry->next;
  }

  if (isLocked != NULL)
    *isLocked = false;

  return 0;
}

VER_t VSS::getHighestVerInVB(LBID_t lbid, VER_t max) const
{
  int hashIndex, currentIndex;
  VER_t ret = -1;
  VSSEntry* listEntry;

  hashIndex = hasher((char*)&lbid, sizeof(lbid)) % vss->numHashBuckets;
  currentIndex = hashBuckets[hashIndex];

  while (currentIndex != -1)
  {
    listEntry = &storage[currentIndex];

    if ((listEntry->lbid == lbid && listEntry->vbFlag) && (listEntry->verID <= max && listEntry->verID > ret))
      ret = listEntry->verID;

    currentIndex = listEntry->next;
  }

  return ret;
}

bool VSS::isVersioned(LBID_t lbid, VER_t version) const
{
  int hashIndex, currentIndex;
  VSSEntry* listEntry;

  hashIndex = hasher((char*)&lbid, sizeof(lbid)) % vss->numHashBuckets;
  currentIndex = hashBuckets[hashIndex];

  while (currentIndex != -1)
  {
    listEntry = &storage[currentIndex];

    if (listEntry->lbid == lbid && listEntry->verID == version)
      return listEntry->vbFlag;

    currentIndex = listEntry->next;
  }

  return false;
}

bool VSS::isLocked(const LBIDRange& range, VER_t transID) const
{
  int hashIndex, currentIndex;
  LBID_t currentBlock;
  VSSEntry* listEntry;

  for (currentBlock = range.start; currentBlock < range.start + range.size; currentBlock++)
  {
    hashIndex = hasher((char*)&currentBlock, sizeof(currentBlock)) % vss->numHashBuckets;

    currentIndex = hashBuckets[hashIndex];

    while (currentIndex != -1)
    {
      listEntry = &storage[currentIndex];

      if (listEntry->lbid == currentBlock && listEntry->locked)
      {
        if (listEntry->verID == transID)
          return false;
        else
          return true;
      }

      currentIndex = listEntry->next;
    }
  }

  return false;
}

// requires write lock
void VSS::removeEntry(LBID_t lbid, VER_t verID, vector<LBID_t>* flushList)
{
  int index, prev, bucket;

#ifdef BRM_DEBUG

  if (lbid < 0)
  {
    log("VSS::removeEntry(): lbid must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::removeEntry(): lbid must be >= 0");
  }

  if (verID < 0)
  {
    log("VSS::removeEntry(): verID must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::removeEntry(): verID must be >= 0");
  }

#endif

  index = getIndex(lbid, verID, prev, bucket);

  if (index == -1)
  {
#ifdef BRM_DEBUG
    ostringstream ostr;

    ostr << "VSS::removeEntry(): that entry doesn't exist lbid = " << lbid << " verID = " << verID;
    log(ostr.str(), logging::LOG_TYPE_DEBUG);
    throw logic_error(ostr.str());
#else
    return;
#endif
  }

  makeUndoRecord(&storage[index], sizeof(VSSEntry));
  storage[index].lbid = -1;

  if (prev != -1)
  {
    makeUndoRecord(&storage[prev], sizeof(VSSEntry));
    storage[prev].next = storage[index].next;
  }
  else
  {
    makeUndoRecord(&hashBuckets[bucket], sizeof(int));
    hashBuckets[bucket] = storage[index].next;
  }

  makeUndoRecord(vss, sizeof(VSSShmsegHeader));
  vss->currentSize--;

  if (storage[index].locked && (vss->lockedEntryCount > 0))
    vss->lockedEntryCount--;

  if (index < vss->LWM)
    vss->LWM = index;

  // scan the list of entries with that lbid to see if there are others
  // to remove.
  for (index = hashBuckets[bucket]; index != -1; index = storage[index].next)
    if (storage[index].lbid == lbid && (storage[index].vbFlag || storage[index].locked))
      return;

  // if execution gets here, we should be able to remove all entries
  // with the given lbid because none point to the VB.
  for (prev = -1, index = hashBuckets[bucket]; index != -1; index = storage[index].next)
  {
    if (storage[index].lbid == lbid)
    {
      makeUndoRecord(&storage[index], sizeof(VSSEntry));
      storage[index].lbid = -1;

      if (prev == -1)
      {
        makeUndoRecord(&hashBuckets[bucket], sizeof(int));
        hashBuckets[bucket] = storage[index].next;
      }
      else
      {
        makeUndoRecord(&storage[prev], sizeof(VSSEntry));
        storage[prev].next = storage[index].next;
      }

      vss->currentSize--;

      if (storage[index].locked && (vss->lockedEntryCount > 0))
        vss->lockedEntryCount--;

      if (index < vss->LWM)
        vss->LWM = index;
    }
    else
      prev = index;
  }

  flushList->push_back(lbid);
}

bool VSS::isTooOld(LBID_t lbid, VER_t verID) const
{
  int index, bucket;
  VER_t minVer = 0;
  VSSEntry* listEntry;

  bucket = hasher((char*)&lbid, sizeof(lbid)) % vss->numHashBuckets;

  index = hashBuckets[bucket];

  while (index != -1)
  {
    listEntry = &storage[index];

    if (listEntry->lbid == lbid && minVer > listEntry->verID)
      minVer = listEntry->verID;

    index = listEntry->next;
  }

  return (minVer > verID);
}

bool VSS::isEntryLocked(LBID_t lbid, VER_t verID) const
{
  int index;
  int bucket;

  if (lbid == -1)
    return false;

  /* See bug 3287 for info on these code blocks */

  /*  This version checks for the specific lbid,ver pair
      index = getIndex(lbid, verID, prev, bucket);
      if (index < 0)
              return false;
      return storage[index].locked;
  */

  /*  This version considers any locked entry for an LBID to mean the block is locked.
      VSSEntry *listEntry;
      bucket = hasher((char *) &lbid, sizeof(lbid)) % vss->numHashBuckets;

      index = hashBuckets[bucket];
      while (index != -1) {
              listEntry = &storage[index];
              if (listEntry->lbid == lbid && listEntry->locked)
              {
                      ostringstream msg;
                      msg << " Locked entry information lbid:verID = " << listEntry->lbid << ":" <<
     listEntry->verID << endl; log(msg.str(), logging::LOG_TYPE_CRITICAL); return true;
              }
              index = listEntry->next;
      }
      return false;
  */

  // This version considers the blocks needed for rollback to be locked,
  // otherwise they're unlocked.  Note, the version with the 'locked' flag set
  // will never be a candidate for aging out b/c it's not in the version buffer.
  // TODO:  Update this when we support multiple transactions at once.  Need to
  // identify ALL versions needed for rollback.

  VSSEntry* listEntry;
  bool hasALockedEntry = false;
  VER_t rollbackVersion = 0;

  bucket = hasher((char*)&lbid, sizeof(lbid)) % vss->numHashBuckets;

  index = hashBuckets[bucket];

  while (index != -1)
  {
    listEntry = &storage[index];

    if (listEntry->lbid == lbid)
    {
      if (listEntry->locked)
        hasALockedEntry = true;
      else if (rollbackVersion < listEntry->verID)
        rollbackVersion = listEntry->verID;
    }

    index = listEntry->next;
  }

  return (hasALockedEntry && verID == rollbackVersion);
}

// read lock
int VSS::getIndex(LBID_t lbid, VER_t verID, int& prev, int& bucket) const
{
  int currentIndex;
  VSSEntry* listEntry;

  prev = -1;
  bucket = hasher((char*)&lbid, sizeof(lbid)) % vss->numHashBuckets;

  currentIndex = hashBuckets[bucket];

  while (currentIndex != -1)
  {
    listEntry = &storage[currentIndex];

    if (listEntry->lbid == lbid && listEntry->verID == verID)
      return currentIndex;

    prev = currentIndex;
    currentIndex = listEntry->next;
  }

  return -1;
}

// write lock
void VSS::setVBFlag(LBID_t lbid, VER_t verID, bool vbFlag)
{
  int index, prev, bucket;

#ifdef BRM_DEBUG

  if (lbid < 0)
  {
    log("VSS::setVBFlag(): lbid must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::setVBFlag(): lbid must be >= 0");
  }

  if (verID < 0)
  {
    log("VSS::setVBFlag(): verID must be >= 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::setVBFlag(): verID must be >= 0");
  }

#endif

  index = getIndex(lbid, verID, prev, bucket);

  if (index == -1)
  {
    ostringstream ostr;

    ostr << "VSS::setVBFlag(): that entry doesn't exist lbid=" << lbid << " ver=" << verID;
    log(ostr.str(), logging::LOG_TYPE_DEBUG);
    throw logic_error(ostr.str());
  }

  makeUndoRecord(&storage[index], sizeof(VSSEntry));
  storage[index].vbFlag = vbFlag;
}

// write lock
void VSS::commit(VER_t txnID)
{
  int i;

#ifdef BRM_DEBUG

  if (txnID < 1)
  {
    log("VSS::commit(): txnID must be > 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::commit(): txnID must be > 0");
  }

#endif

  for (i = 0; i < vss->capacity; i++)
    if (storage[i].lbid != -1 && storage[i].verID == txnID)
    {
#ifdef BRM_DEBUG

      if (storage[i].locked != true)
      {
        ostringstream ostr;
        ostr << "VSS::commit(): An entry has already been unlocked..? txnId = " << txnID;
        log(ostr.str(), logging::LOG_TYPE_DEBUG);
        throw logic_error(ostr.str());
      }

#endif
      makeUndoRecord(&storage[i], sizeof(VSSEntry));
      storage[i].locked = false;

      // @ bug 1426 fix. Decrease the counter when an entry releases its lock.
      if (vss->lockedEntryCount > 0)
        vss->lockedEntryCount--;
    }
}

// read lock
void VSS::getUncommittedLBIDs(VER_t txnID, vector<LBID_t>& lbids)
{
  int i;

  lbids.clear();

#ifdef BRM_DEBUG

  if (txnID < 1)
  {
    log("VSS::getUncommittedLBIDs(): txnID must be > 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::getUncommittedLBIDs(): txnID must be > 0");
  }

#endif

  for (i = 0; i < vss->capacity; i++)
    if (storage[i].lbid != -1 && storage[i].verID == txnID)
    {
#ifdef BRM_DEBUG

      if (storage[i].locked == false)
      {
        log("VSS::getUncommittedLBIDs(): found an unlocked block with that TxnID", logging::LOG_TYPE_DEBUG);
        throw logic_error("VSS::getUncommittedLBIDs(): found an unlocked block with that TxnID");
      }

      if (storage[i].vbFlag == true)
      {
        log("VSS::getUncommittedLBIDs(): found a block with that TxnID in the VB", logging::LOG_TYPE_DEBUG);
        throw logic_error("VSS::getUncommittedLBIDs(): found a block with that TxnID in the VB");
      }

#endif
      lbids.push_back(storage[i].lbid);
    }
}

void VSS::getUnlockedLBIDs(BlockList_t& lbids)
{
  lbids.clear();

  for (int i = 0; i < vss->capacity; i++)
    if (storage[i].lbid != -1 && !storage[i].locked)
      lbids.push_back(LVP_t(storage[i].lbid, storage[i].verID));
}

void VSS::getLockedLBIDs(BlockList_t& lbids)
{
  lbids.clear();

  for (int i = 0; i < vss->capacity; i++)
    if (storage[i].lbid != -1 && storage[i].locked)
      lbids.push_back(LVP_t(storage[i].lbid, storage[i].verID));
}
// write lock
/* Rewritten on 6/2/10 to be O(n) with the size of range rather than
 * O(nlogn) with VSS capacity. */
void VSS::removeEntriesFromDB(const LBIDRange& range, VBBM& vbbm, bool use_vbbm)
{
  int bucket, index, prev;
  LBID_t lastLBID, lbid;

#ifdef BRM_DEBUG

  if (range.start < 0)
  {
    log("VSS::removeEntriesFromDB(): lbids must be positive.", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::removeEntriesFromDB(): lbids must be positive.");
  }

  if (range.size < 1)
  {
    log("VSS::removeEntriesFromDB(): size must be > 0", logging::LOG_TYPE_DEBUG);
    throw invalid_argument("VSS::removeEntriesFromDB(): size must be > 0");
  }

#endif

  makeUndoRecord(vss, sizeof(VSSShmsegHeader));

  lastLBID = range.start + range.size - 1;

  for (lbid = range.start; lbid <= lastLBID; lbid++)
  {
    bucket = hasher((char*)&lbid, sizeof(lbid)) % vss->numHashBuckets;

    for (prev = -1, index = hashBuckets[bucket]; index != -1; index = storage[index].next)
    {
      if (storage[index].lbid == lbid)
      {
        if (storage[index].vbFlag && use_vbbm)
          vbbm.removeEntry(storage[index].lbid, storage[index].verID);

        makeUndoRecord(&storage[index], sizeof(VSSEntry));
        storage[index].lbid = -1;

        if (prev == -1)
        {
          makeUndoRecord(&hashBuckets[bucket], sizeof(int));
          hashBuckets[bucket] = storage[index].next;
        }
        else
        {
          makeUndoRecord(&storage[prev], sizeof(VSSEntry));
          storage[prev].next = storage[index].next;
        }

        vss->currentSize--;

        if (storage[index].locked && (vss->lockedEntryCount > 0))
          vss->lockedEntryCount--;

        if (index < vss->LWM)
          vss->LWM = index;
      }
      else
        prev = index;
    }
  }
}

int VSS::size() const
{
  int i, ret = 0;

  for (i = 0; i < vss->capacity; i++)
    if (storage[i].lbid != -1)
      ret++;

  if (ret != vss->currentSize)
  {
    ostringstream ostr;

    ostr << "VSS: actual size & recorded size disagree.  actual size = " << ret
         << " recorded size = " << vss->currentSize;
    log(ostr.str(), logging::LOG_TYPE_DEBUG);
    throw logic_error(ostr.str());
  }

  return ret;
}

bool VSS::hashEmpty() const
{
  int i;

  for (i = 0; i < vss->numHashBuckets; i++)
    if (hashBuckets[i] != -1)
      return false;

  return true;
}

void VSS::clear()
{
  int allocSize;
  key_t newshmkey;
  char* newshmseg;

  allocSize = VSS_INITIAL_SIZE;

  newshmkey = chooseShmkey();

  idbassert(fPVSSImpl);
  idbassert(fPVSSImpl->key() != (unsigned)newshmkey);
  fPVSSImpl->clear(newshmkey, allocSize);
  vssShminfo->tableShmkey = newshmkey;
  vssShminfo->allocdSize = allocSize;
  vss = fPVSSImpl->get();
  initShmseg();

  if (r_only)
  {
    fPVSSImpl->makeReadOnly();
    vss = fPVSSImpl->get();
  }

  newshmseg = reinterpret_cast<char*>(vss);
  hashBuckets = reinterpret_cast<int*>(&newshmseg[sizeof(VSSShmsegHeader)]);
  storage =
      reinterpret_cast<VSSEntry*>(&newshmseg[sizeof(VSSShmsegHeader) + vss->numHashBuckets * sizeof(int)]);
}

// read lock
int VSS::checkConsistency(const VBBM& vbbm, ExtentMap& em) const
{
  /*
  1. Every valid entry in the VSS has an entry either in the VBBM or in the
  EM.  Verify that.
  2. Struct consistency checks
      a. current size agrees with actual # of used entries
      b. there are no empty elements in the hashed lists
      c. each hash table entry points to a non-empty element or -1
      d. verify that there are no empty entries below the LWM
      e. verify uniqueness of the entries

  */

  int i, j, k, err;

  /* Test 1 */

  OID_t oid;
  uint32_t fbo;

  for (i = 0; i < vss->capacity; i++)
  {
    if (storage[i].lbid != -1)
    {
      if (storage[i].vbFlag)
      {
        err = vbbm.lookup(storage[i].lbid, storage[i].verID, oid, fbo);

        if (err != 0)
        {
          cerr << "VSS: lbid=" << storage[i].lbid << " verID=" << storage[i].verID
               << " vbFlag=true isn't in the VBBM" << endl;
          throw logic_error("VSS::checkConsistency(): a VSS entry with vbflag set is not in the VBBM");
        }
      }
      else
      {
// This version of em.lookup was made obsolete with multiple files per OID.
// If ever want to really use this checkConsistency() function, this section
// of code needs to be updated to use the new lookup() API.
#if 0
                err = em.lookup(storage[i].lbid, (int&)oid, fbo);

                if (err != 0)
                {
                    cerr << "VSS: lbid=" << storage[i].lbid << " verID=" <<
                         storage[i].verID << " vbFlag=false has no extent" <<
                         endl;
                    throw logic_error("VSS::checkConsistency(): a VSS entry with vbflag unset is not in the EM");
                }

#endif
      }
    }
  }

  /* Test 2a is already implemented */
  size();

  /* Tests 2b & 2c - no empty elements reachable from the hash table */

  int nextElement;

  for (i = 0; i < vss->numHashBuckets; i++)
  {
    if (hashBuckets[i] != -1)
      for (nextElement = hashBuckets[i]; nextElement != -1; nextElement = storage[nextElement].next)
        if (storage[nextElement].lbid == -1)
          throw logic_error(
              "VSS::checkConsistency(): an empty storage entry is reachable from the hash table");
  }

  /* Test 2d - verify that there are no empty entries below the LWM */

  for (i = 0; i < vss->LWM; i++)
    if (storage[i].lbid == -1)
    {
      cerr << "VSS: LWM=" << vss->LWM << " first empty entry=" << i << endl;
      throw logic_error("VSS::checkConsistency(): LWM accounting error");
    }

  /* Test 2e - verify uniqueness of each entry */

  for (i = 0; i < vss->numHashBuckets; i++)
    if (hashBuckets[i] != -1)
      for (j = hashBuckets[i]; j != -1; j = storage[j].next)
        for (k = storage[j].next; k != -1; k = storage[k].next)
          if (storage[j].lbid == storage[k].lbid && storage[j].verID == storage[k].verID)
          {
            cerr << "VSS: lbid=" << storage[j].lbid << " verID=" << storage[j].verID << endl;
            throw logic_error("VSS::checkConsistency(): Duplicate entry found");
          }

  return 0;
}

void VSS::setReadOnly()
{
  r_only = true;
}

void VSS::getCurrentTxnIDs(set<VER_t>& list) const
{
  int i;

  for (i = 0; i < vss->capacity; i++)
    if (storage[i].lbid != -1 && storage[i].locked)
      list.insert(storage[i].verID);
}

/* File Format:

                VSS V1 magic (32-bits)
                # of VSS entries in capacity (32-bits)
                struct VSSEntry * #
*/

struct Header
{
  int magic;
  int entries;
};

// read lock
void VSS::save(string filename)
{
  int i;
  struct Header header;

  const char* filename_p = filename.c_str();
  scoped_ptr<IDBDataFile> out(IDBDataFile::open(IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG),
                                                filename_p, "wb", IDBDataFile::USE_VBUF));

  if (!out)
  {
    log_errno("VSS::save()");
    throw runtime_error("VSS::save(): Failed to open the file");
  }

  header.magic = VSS_MAGIC_V1;
  header.entries = vss->currentSize;

  if (out->write((char*)&header, sizeof(header)) != sizeof(header))
  {
    log_errno("VSS::save()");
    throw runtime_error("VSS::save(): Failed to write header to the file");
  }

  int first = -1, last = -1, err;
  size_t progress, writeSize;
  for (i = 0; i < vss->capacity; i++)
  {
    if (storage[i].lbid != -1 && first == -1)
      first = i;
    else if (storage[i].lbid == -1 && first != -1)
    {
      last = i;
      writeSize = (last - first) * sizeof(VSSEntry);
      progress = 0;
      char* writePos = (char*)&storage[first];
      while (progress < writeSize)
      {
        err = out->write(writePos + progress, writeSize - progress);
        if (err < 0)
        {
          log_errno("VSS::save()");
          throw runtime_error("VSS::save(): Failed to write the file");
        }
        progress += err;
      }
      first = -1;
    }
  }
  if (first != -1)
  {
    writeSize = (vss->capacity - first) * sizeof(VSSEntry);
    progress = 0;
    char* writePos = (char*)&storage[first];
    while (progress < writeSize)
    {
      err = out->write(writePos + progress, writeSize - progress);
      if (err < 0)
      {
        log_errno("VSS::save()");
        throw runtime_error("VSS::save(): Failed to write the file");
      }
      progress += err;
    }
  }
}

// Ideally, we;d like to get in and out of this fcn as quickly as possible.
bool VSS::isEmpty(bool useLock)
{
#if 0

    if (fPVSSImpl == 0 || fPVSSImpl->key() != (unsigned)mst.getVSSShmkey())
    {
        lock(READ);
        release(READ);
    }

    //this really should be done under a read lock. There's a small chance that between the release()
    // above and now that the underlying SHM could be changed. This would be exacerbated during
    // high DML/query activity.
    return (fPVSSImpl->get()->currentSize == 0);
#endif
  // Should be race-free, but takes along time...
  bool rc;

  if (useLock)
    lock(READ);

  rc = (fPVSSImpl->get()->currentSize == 0);

  if (useLock)
    release(READ);

  return rc;
}

//#include "boost/date_time/posix_time/posix_time.hpp"
// using namespace boost::posix_time;

void VSS::load(string filename)
{
  int i;
  struct Header header;
  struct VSSEntry entry;
  // ptime time1, time2;

  // time1 = microsec_clock::local_time();
  // cout << "loading the VSS " << time1 << endl;
  const char* filename_p = filename.c_str();
  scoped_ptr<IDBDataFile> in(
      IDBDataFile::open(IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG), filename_p, "rb", 0));

  if (!in)
  {
    log_errno("VSS::load()");
    throw runtime_error("VSS::load(): Failed to open the file");
  }

  if (in->read((char*)&header, sizeof(header)) != sizeof(header))
  {
    log_errno("VSS::load()");
    throw runtime_error("VSS::load(): Failed to read header");
  }

  if (header.magic != VSS_MAGIC_V1)
  {
    log("VSS::load(): Bad magic.  Not a VSS file?");
    throw runtime_error("VSS::load(): Bad magic.  Not a VSS file?");
  }

  if (header.entries < 0)
  {
    log("VSS::load(): Bad size.  Not a VSS file?");
    throw runtime_error("VSS::load(): Bad size.  Not a VSS file?");
  }

  growForLoad(header.entries);

  /*
      for (i = 0; i < vss->capacity; i++)
              storage[i].lbid = -1;
      for (i = 0; i < vss->numHashBuckets; i++)
              hashBuckets[i] = -1;
      vss->currentSize = 0;
      vss->lockedEntryCount = 0;
      vss->LWM = 0;
  */

  size_t readSize = header.entries * sizeof(entry);
  char* readBuf = new char[readSize];
  size_t progress = 0;
  int err;
  while (progress < readSize)
  {
    err = in->read(readBuf + progress, readSize - progress);
    if (err < 0)
    {
      log_errno("VBBM::load()");
      throw runtime_error("VBBM::load(): Failed to load, check the critical log file");
    }
    else if (err == 0)
    {
      log("VBBM::load(): Got early EOF");
      throw runtime_error("VBBM::load(): Got early EOF");
    }
    progress += err;
  }

  VSSEntry* loadedEntries = (VSSEntry*)readBuf;
  for (i = 0; i < header.entries; i++)
    insert(loadedEntries[i].lbid, loadedEntries[i].verID, loadedEntries[i].vbFlag, loadedEntries[i].locked,
           true);

  // time2 = microsec_clock::local_time();
  // cout << "done loading " << time2 << " duration: " << time2-time1 << endl;
}

#ifdef BRM_DEBUG
// read lock
int VSS::getShmid() const
{
  return vssShmid;
}
#endif

QueryContext_vss::QueryContext_vss(const QueryContext& qc) : currentScn(qc.currentScn)
{
  txns.reset(new set<VER_t>());

  for (uint32_t i = 0; i < qc.currentTxns->size(); i++)
    txns->insert((*qc.currentTxns)[i]);
}

}  // namespace BRM
